RocketRegressor (sktime-style) — ROCKET features + RidgeCV#
ROCKET (RandOm Convolutional KErnel Transform) turns each time series window into a large feature vector by applying many random 1D convolutions and extracting two summary stats per kernel:
\(\max\) of the convolution output
PPV = proportion of positive values
Then a fast linear model (often RidgeCV) is fit on those features.
This notebook implements a runnable reference RocketRegressor(num_kernels=..., ...) inspired by sktime’s wrapper.
Why ROCKET works (intuition)#
Random convolutions act like a large, diverse set of pattern detectors:
short/long kernels react to different scales
dilation creates multi-scale receptive fields
max captures presence of a pattern; PPV captures how often it’s present
With enough random kernels, a linear model can fit surprisingly rich functions.
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio
from sklearn.linear_model import RidgeCV, Ridge
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from scipy import stats
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"
rng = np.random.default_rng(7)
import numpy, pandas, sklearn, scipy, plotly
print("numpy:", numpy.__version__)
print("pandas:", pandas.__version__)
print("sklearn:", sklearn.__version__)
print("scipy:", scipy.__version__)
print("plotly:", plotly.__version__)
numpy: 1.26.2
pandas: 2.1.3
sklearn: 1.6.0
scipy: 1.15.0
plotly: 6.5.2
def _as_3d_panel(X: np.ndarray) -> np.ndarray:
"""Accept (n, m) or (n, d, m). Return (n, d, m)."""
X = np.asarray(X, dtype=float)
if X.ndim == 2:
return X[:, None, :]
if X.ndim == 3:
return X
raise ValueError(f"X must be 2D or 3D, got shape={X.shape}")
def _acf(x: np.ndarray, max_lag: int) -> tuple[np.ndarray, np.ndarray]:
x = np.asarray(x, dtype=float)
x = x - x.mean()
denom = float(np.dot(x, x))
lags = np.arange(max_lag + 1)
values = np.zeros(max_lag + 1)
values[0] = 1.0
if denom == 0.0:
return lags, values
for k in range(1, max_lag + 1):
values[k] = float(np.dot(x[k:], x[:-k]) / denom)
return lags, values
def make_sliding_windows(y: np.ndarray, window_length: int) -> tuple[np.ndarray, np.ndarray]:
y = np.asarray(y, dtype=float)
L = int(window_length)
if y.size <= L:
raise ValueError("y is too short")
X = np.column_stack([y[i : y.size - L + i] for i in range(L)])
y_next = y[L:]
return X, y_next
class RocketTransformer:
"""ROCKET feature map: random dilated convolutions -> [max, ppv] features per kernel."""
def __init__(
self,
*,
num_kernels: int = 500,
kernel_sizes: tuple[int, ...] = (7, 9, 11),
random_state: int | None = None,
):
self.num_kernels = int(num_kernels)
self.kernel_sizes = tuple(int(k) for k in kernel_sizes)
self.random_state = random_state
self.kernels_: list[dict] | None = None
self.n_timepoints_: int | None = None
self.n_dims_: int | None = None
def fit(self, X, y=None):
X3 = _as_3d_panel(X)
n, d, m = X3.shape
self.n_timepoints_ = int(m)
self.n_dims_ = int(d)
r = np.random.default_rng(self.random_state)
kernels: list[dict] = []
for _ in range(self.num_kernels):
k = int(r.choice(self.kernel_sizes))
dim = int(r.integers(0, d))
# Dilation: choose from powers of two up to what fits
if m <= k:
dilation = 1
else:
max_d = (m - 1) // (k - 1)
max_pow = int(np.floor(np.log2(max_d))) if max_d > 0 else 0
dilation = int(2 ** r.integers(0, max_pow + 1))
padding = 0
if bool(r.integers(0, 2)):
padding = int(((k - 1) * dilation) // 2)
weights = r.normal(0.0, 1.0, size=k)
weights = weights - weights.mean()
bias = float(r.uniform(-1.0, 1.0))
kernels.append(
{
"length": k,
"weights": weights.astype(float),
"bias": bias,
"dilation": dilation,
"padding": padding,
"dim": dim,
}
)
self.kernels_ = kernels
return self
def transform(self, X) -> np.ndarray:
if self.kernels_ is None or self.n_timepoints_ is None or self.n_dims_ is None:
raise RuntimeError("Call fit() before transform().")
X3 = _as_3d_panel(X)
n, d, m = X3.shape
if m != self.n_timepoints_ or d != self.n_dims_:
raise ValueError(
f"X must have shape (n,{self.n_dims_},{self.n_timepoints_}) (or (n,{self.n_timepoints_})), got {X3.shape}"
)
features = np.empty((n, 2 * len(self.kernels_)), dtype=float)
for i, kinfo in enumerate(self.kernels_):
k = int(kinfo["length"])
w = kinfo["weights"]
b = float(kinfo["bias"])
dilation = int(kinfo["dilation"])
padding = int(kinfo["padding"])
dim = int(kinfo["dim"])
x = X3[:, dim, :]
if padding > 0:
x = np.pad(x, ((0, 0), (padding, padding)), mode="constant")
m_pad = x.shape[1]
out_len = m_pad - dilation * (k - 1)
if out_len <= 0:
# Degenerate: no valid positions, fall back to zeros
max_val = np.zeros(n)
ppv = np.zeros(n)
else:
starts = np.arange(out_len)[:, None]
idx = starts + dilation * np.arange(k)[None, :]
seg = x[:, idx] # (n, out_len, k)
conv = np.tensordot(seg, w, axes=([2], [0])) + b # (n, out_len)
max_val = conv.max(axis=1)
ppv = (conv > 0).mean(axis=1)
features[:, 2 * i] = max_val
features[:, 2 * i + 1] = ppv
return features
def fit_transform(self, X, y=None) -> np.ndarray:
return self.fit(X, y=y).transform(X)
class RocketRegressor:
"""ROCKET + RidgeCV regressor (sktime-style wrapper)."""
def __init__(
self,
*,
num_kernels: int = 500,
kernel_sizes: tuple[int, ...] = (7, 9, 11),
alphas: np.ndarray | None = None,
random_state: int | None = None,
):
self.num_kernels = int(num_kernels)
self.kernel_sizes = tuple(int(k) for k in kernel_sizes)
self.alphas = np.asarray(alphas, dtype=float) if alphas is not None else np.logspace(-3, 3, 13)
self.random_state = random_state
self.transformer_ = RocketTransformer(
num_kernels=self.num_kernels,
kernel_sizes=self.kernel_sizes,
random_state=self.random_state,
)
self.regressor_ = RidgeCV(alphas=self.alphas)
def fit(self, X, y):
y = np.asarray(y, dtype=float)
Phi = self.transformer_.fit_transform(X)
self.regressor_.fit(Phi, y)
return self
def predict(self, X) -> np.ndarray:
Phi = self.transformer_.transform(X)
return self.regressor_.predict(Phi)
Demo: one-step and recursive forecasts via sliding windows#
We generate a multi-seasonal series, build a sliding-window regression dataset, and compare:
RocketRegressora baseline
Ridgeon raw lag features
def simulate_ar1_noise(n: int, *, phi: float, sigma: float, rng: np.random.Generator) -> np.ndarray:
eps = rng.normal(0.0, sigma, size=n)
u = np.zeros(n)
for t in range(1, n):
u[t] = phi * u[t - 1] + eps[t]
return u
n = 650
idx = pd.date_range("2021-01-01", periods=n, freq="D")
t = np.arange(n)
weekly = 2.0 * np.sin(2 * np.pi * t / 7) + 0.6 * np.cos(2 * np.pi * t / 7)
monthly = 1.2 * np.sin(2 * np.pi * t / 30) - 0.4 * np.cos(2 * np.pi * t / 30)
trend = 0.015 * t
noise = simulate_ar1_noise(n, phi=0.7, sigma=0.7, rng=rng)
y = 40.0 + trend + weekly + monthly + noise
y = pd.Series(y, index=idx, name="y")
fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index, y=y, name="y", line=dict(color="black")))
fig.update_layout(title="Synthetic multi-seasonal series", xaxis_title="date", yaxis_title="value")
fig.show()
L = 60
X, y_next = make_sliding_windows(y.to_numpy(), window_length=L)
h = 120
X_train, y_train = X[:-h], y_next[:-h]
X_test, y_test = X[-h:], y_next[-h:]
t_test = y.index[-h:]
rocket = RocketRegressor(num_kernels=600, random_state=7)
rocket.fit(X_train, y_train)
pred_rocket = rocket.predict(X_test)
ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)
pred_ridge = ridge.predict(X_test)
def summarize(name: str, y_true: np.ndarray, y_pred: np.ndarray) -> None:
mae = mean_absolute_error(y_true, y_pred)
rmse = mean_squared_error(y_true, y_pred, squared=False)
r2 = r2_score(y_true, y_pred)
print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")
summarize("ROCKET+RidgeCV", y_test, pred_rocket)
summarize("Ridge on lags", y_test, pred_ridge)
print("Chosen alpha (ROCKET RidgeCV):", float(rocket.regressor_.alpha_))
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[6], line 24
20 r2 = r2_score(y_true, y_pred)
21 print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")
---> 24 summarize("ROCKET+RidgeCV", y_test, pred_rocket)
25 summarize("Ridge on lags", y_test, pred_ridge)
26 print("Chosen alpha (ROCKET RidgeCV):", float(rocket.regressor_.alpha_))
Cell In[6], line 19, in summarize(name, y_true, y_pred)
17 def summarize(name: str, y_true: np.ndarray, y_pred: np.ndarray) -> None:
18 mae = mean_absolute_error(y_true, y_pred)
---> 19 rmse = mean_squared_error(y_true, y_pred, squared=False)
20 r2 = r2_score(y_true, y_pred)
21 print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")
File ~/miniconda3/lib/python3.12/site-packages/sklearn/utils/_param_validation.py:194, in validate_params.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
191 func_sig = signature(func)
193 # Map *args/**kwargs to the function signature
--> 194 params = func_sig.bind(*args, **kwargs)
195 params.apply_defaults()
197 # ignore self/cls and positional/keyword markers
File ~/miniconda3/lib/python3.12/inspect.py:3277, in Signature.bind(self, *args, **kwargs)
3272 def bind(self, /, *args, **kwargs):
3273 """Get a BoundArguments object, that maps the passed `args`
3274 and `kwargs` to the function's signature. Raises `TypeError`
3275 if the passed arguments can not be bound.
3276 """
-> 3277 return self._bind(args, kwargs)
File ~/miniconda3/lib/python3.12/inspect.py:3266, in Signature._bind(self, args, kwargs, partial)
3256 raise TypeError(
3257 'got some positional-only arguments passed as '
3258 'keyword arguments: {arg!r}'.format(
(...) 3263 ),
3264 )
3265 else:
-> 3266 raise TypeError(
3267 'got an unexpected keyword argument {arg!r}'.format(
3268 arg=next(iter(kwargs))))
3270 return self._bound_arguments_cls(self, arguments)
TypeError: got an unexpected keyword argument 'squared'
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_test, name="actual", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_test, y=pred_rocket, name="ROCKET+RidgeCV", line=dict(color="#59A14F")))
fig.add_trace(go.Scatter(x=t_test, y=pred_ridge, name="Ridge(lags)", line=dict(color="#E15759", dash="dot")))
fig.update_layout(title="One-step-ahead predictions", xaxis_title="date", yaxis_title="value")
fig.show()
def recursive_forecast(model, history: np.ndarray, steps: int, window_length: int) -> np.ndarray:
history = np.asarray(history, dtype=float).tolist()
preds = []
for _ in range(int(steps)):
x = np.asarray(history[-window_length:], dtype=float)[None, :]
y_hat = float(model.predict(x)[0])
preds.append(y_hat)
history.append(y_hat)
return np.asarray(preds)
h2 = 45
start = len(y) - h
hist = y.to_numpy()[:start]
truth = y.to_numpy()[start : start + h2]
t_h2 = y.index[start : start + h2]
rec = recursive_forecast(rocket, history=hist, steps=h2, window_length=L)
fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index[start - 120 : start], y=y.to_numpy()[start - 120 : start], name="history", line=dict(color="rgba(0,0,0,0.35)")))
fig.add_trace(go.Scatter(x=t_h2, y=truth, name="truth", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_h2, y=rec, name="recursive forecast", line=dict(color="#59A14F")))
fig.update_layout(title="Recursive multi-step forecast (reduction)", xaxis_title="date", yaxis_title="value")
fig.show()
# Residual diagnostics (ROCKET, test horizon)
resid = y_test - pred_rocket
print("residual mean:", float(np.mean(resid)))
print("residual std:", float(np.std(resid, ddof=1)))
print("Jarque-Bera:", stats.jarque_bera(resid))
lags, acf_vals = _acf(resid, max_lag=30)
bound = 1.96 / np.sqrt(resid.size)
# QQ data
nq = resid.size
p = (np.arange(1, nq + 1) - 0.5) / nq
theoretical = stats.norm.ppf(p)
sample_q = np.sort((resid - resid.mean()) / resid.std(ddof=1))
fig = make_subplots(
rows=2,
cols=2,
subplot_titles=("Residuals over time", "Residual histogram", "Residual ACF", "QQ plot (std residuals)"),
)
fig.add_trace(go.Scatter(x=t_test, y=resid, name="residuals", line=dict(color="#59A14F")), row=1, col=1)
fig.add_hline(y=0, line=dict(color="black", dash="dash"), row=1, col=1)
fig.add_trace(go.Histogram(x=resid, nbinsx=25, name="hist", marker_color="#59A14F"), row=1, col=2)
fig.add_trace(go.Bar(x=lags, y=acf_vals, name="ACF(resid)", marker_color="#59A14F"), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[bound, bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[-bound, -bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=theoretical, y=sample_q, mode="markers", name="QQ", marker=dict(color="#59A14F")), row=2, col=2)
fig.add_trace(
go.Scatter(x=[theoretical.min(), theoretical.max()], y=[theoretical.min(), theoretical.max()], mode="lines", line=dict(color="black", dash="dash"), showlegend=False),
row=2,
col=2,
)
fig.update_layout(height=750, title="Residual diagnostics (ROCKET)")
fig.show()
residual mean: 0.41093602627152837
residual std: 0.7574081863879396
Jarque-Bera: SignificanceResult(statistic=3.633108641106042, pvalue=0.1625850026815621)